{
  "nbformat": 4, 
  "nbformat_minor": 0, 
  "metadata": {
    "colab": {
      "name": "Try Apache Beam - Python", 
      "version": "0.3.2", 
      "provenance": [], 
      "collapsed_sections": [], 
      "toc_visible": true, 
      "include_colab_link": true
    }, 
    "kernelspec": {
      "name": "python2", 
      "display_name": "Python 2"
    }
  }, 
  "cells": [
    {
      "cell_type": "markdown", 
      "metadata": {
        "id": "view-in-github", 
        "colab_type": "text"
      }, 
      "source": [
        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/try-apache-beam-py.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
      ]
    }, 
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ],
      "outputs": [],
      "metadata": {
        "cellView": "form"
      }
    },
    {
      "metadata": {
        "id": "lNKIMlEDZ_Vw", 
        "colab_type": "text"
      }, 
      "cell_type": "markdown", 
      "source": [
        "# Try Apache Beam - Python\n", 
        "\n", 
        "In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).\n", 
        "\n", 
        "To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.\n", 
        "\n", 
        "To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.\n", 
        "\n", 
        "To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb)."
      ]
    }, 
    {
      "metadata": {
        "id": "Fz6KSQ13_3Rr", 
        "colab_type": "text"
      }, 
      "cell_type": "markdown", 
      "source": [
        "# Setup\n", 
        "\n", 
        "First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline."
      ]
    }, 
    {
      "metadata": {
        "id": "GOOk81Jj_yUy", 
        "colab_type": "code", 
        "outputId": "d283dfb2-4f51-4fec-816b-f57b0cb9b71c", 
        "colab": {
          "base_uri": "https://localhost:8080/", 
          "height": 170
        }
      }, 
      "cell_type": "code", 
      "source": [
        "# Run and print a shell command.\n", 
        "def run(cmd):\n", 
        "  print('>> {}'.format(cmd))\n", 
        "  !{cmd}\n", 
        "  print('')\n", 
        "\n", 
        "# Install apache-beam.\n", 
        "run('pip install --quiet apache-beam')\n", 
        "\n", 
        "# Copy the input file into the local file system.\n", 
        "run('mkdir -p data')\n", 
        "run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')"
      ], 
      "execution_count": 1, 
      "outputs": [
        {
          "output_type": "stream", 
          "text": [
            ">> pip install --quiet apache-beam\n", 
            "\n", 
            ">> mkdir -p data\n", 
            "\n", 
            ">> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/\n", 
            "Copying gs://dataflow-samples/shakespeare/kinglear.txt...\n", 
            "/ [1 files][153.6 KiB/153.6 KiB]                                                \n", 
            "Operation completed over 1 objects/153.6 KiB.                                    \n", 
            "\n"
          ], 
          "name": "stdout"
        }
      ]
    }, 
    {
      "metadata": {
        "id": "cPvvFB19uXNw", 
        "colab_type": "text"
      }, 
      "cell_type": "markdown", 
      "source": [
        "# Minimal word count\n", 
        "\n", 
        "The following example is the \"Hello, World!\" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.\n", 
        "\n", 
        "There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments."
      ]
    }, 
    {
      "metadata": {
        "id": "oUqfqWyMuIfR", 
        "colab_type": "code", 
        "outputId": "52de90fb-abcc-46c7-bccc-5b0416beb50c", 
        "colab": {
          "base_uri": "https://localhost:8080/", 
          "height": 1173
        }
      }, 
      "cell_type": "code", 
      "source": [
        "import apache_beam as beam\n", 
        "import re\n", 
        "\n", 
        "inputs_pattern = 'data/*'\n", 
        "outputs_prefix = 'outputs/part'\n", 
        "\n", 
        "# Running locally in the DirectRunner.\n", 
        "with beam.Pipeline() as pipeline:\n", 
        "  (\n", 
        "      pipeline\n", 
        "      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n", 
        "      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n", 
        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n", 
        "      | 'Group and sum' >> beam.CombinePerKey(sum)\n", 
        "      | 'Format results' >> beam.Map(lambda word_count: str(word_count))\n", 
        "      | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n", 
        "  )\n", 
        "\n", 
        "# Sample the first 20 results, remember there are no ordering guarantees.\n", 
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ], 
      "execution_count": 2, 
      "outputs": [
        {
          "output_type": "stream", 
          "text": [
            "WARNING:root:Deleting 1 existing files in target path matching: -*-of-%(num_shards)05d\n"
          ], 
          "name": "stderr"
        }, 
        {
          "output_type": "stream", 
          "text": [
            ">> head -n 20 outputs/part-00000-of-*\n", 
            "==> outputs/part-00000-of-00001 <==\n", 
            "(u'canker', 1)\n", 
            "(u'bounty', 2)\n", 
            "(u'provision', 3)\n", 
            "(u'to', 438)\n", 
            "(u'terms', 2)\n", 
            "(u'unnecessary', 2)\n", 
            "(u'tongue', 5)\n", 
            "(u'knives', 1)\n", 
            "(u'Commend', 1)\n", 
            "(u'Hum', 2)\n", 
            "(u'Set', 2)\n", 
            "(u'smell', 6)\n", 
            "(u'dreadful', 3)\n", 
            "(u'frowning', 1)\n", 
            "(u'World', 1)\n", 
            "(u'tike', 1)\n", 
            "(u'yes', 3)\n", 
            "(u'oldness', 1)\n", 
            "(u'boat', 1)\n", 
            "(u\"in's\", 1)\n", 
            "\n", 
            "==> outputs/part-00000-of-00003 <==\n", 
            "wrath: 3\n", 
            "nicely: 2\n", 
            "hall: 1\n", 
            "Sure: 2\n", 
            "legs: 4\n", 
            "ten: 1\n", 
            "yourselves: 1\n", 
            "embossed: 1\n", 
            "poorly: 1\n", 
            "temper: 2\n", 
            "Dismissing: 1\n", 
            "Legitimate: 1\n", 
            "tyrannous: 1\n", 
            "turn: 13\n", 
            "gold: 2\n", 
            "minds: 1\n", 
            "dowers: 2\n", 
            "policy: 1\n", 
            "I: 708\n", 
            "V: 6\n", 
            "\n", 
            "==> outputs/part-00000-of-00004 <==\n", 
            "retinue: 1\n", 
            "stink: 1\n", 
            "beaks: 1\n", 
            "Ten: 1\n", 
            "riots: 2\n", 
            "Service: 1\n", 
            "dealing: 1\n", 
            "stop: 2\n", 
            "detain: 1\n", 
            "beware: 1\n", 
            "pilferings: 1\n", 
            "swimming: 1\n", 
            "The: 124\n", 
            "Been: 1\n", 
            "behavior: 1\n", 
            "impetuous: 1\n", 
            "Thy: 20\n", 
            "Tis: 24\n", 
            "Soldiers: 7\n", 
            "Juno: 1\n", 
            "\n"
          ], 
          "name": "stdout"
        }
      ]
    }, 
    {
      "metadata": {
        "id": "k-HubCrk-h_G", 
        "colab_type": "text"
      }, 
      "cell_type": "markdown", 
      "source": [
        "# Word count with comments\n", 
        "\n", 
        "Below is mostly the same code as above, but with comments explaining every line in more detail."
      ]
    }, 
    {
      "metadata": {
        "id": "x_D7sxUHFzUp", 
        "colab_type": "code", 
        "outputId": "44c926df-aa4a-4bea-9247-27c7cb537717", 
        "colab": {
          "base_uri": "https://localhost:8080/", 
          "height": 1173
        }
      }, 
      "cell_type": "code", 
      "source": [
        "import apache_beam as beam\n", 
        "import re\n", 
        "\n", 
        "inputs_pattern = 'data/*'\n", 
        "outputs_prefix = 'outputs/part'\n", 
        "\n", 
        "# Running locally in the DirectRunner.\n", 
        "with beam.Pipeline() as pipeline:\n", 
        "  # Store the word counts in a PCollection.\n", 
        "  # Each element is a tuple of (word, count) of types (str, int).\n", 
        "  word_counts = (\n", 
        "      # The input PCollection is an empty pipeline.\n", 
        "      pipeline\n", 
        "\n", 
        "      # Read lines from a text file.\n", 
        "      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)\n", 
        "      # Element type: str - text line\n", 
        "\n", 
        "      # Use a regular expression to iterate over all words in the line.\n", 
        "      # FlatMap will yield an element for every element in an iterable.\n", 
        "      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r\"[a-zA-Z']+\", line))\n", 
        "      # Element type: str - word\n", 
        "\n", 
        "      # Create key-value pairs where the value is 1, this way we can group by\n", 
        "      # the same word while adding those 1s and get the counts for every word.\n", 
        "      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))\n", 
        "      # Element type: (str, int) - key: word, value: 1\n", 
        "\n", 
        "      # Group by key while combining the value using the sum() function.\n", 
        "      | 'Group and sum' >> beam.CombinePerKey(sum)\n", 
        "      # Element type: (str, int) - key: word, value: counts\n", 
        "  )\n", 
        "\n", 
        "  # We can process a PCollection through other pipelines too.\n", 
        "  (\n", 
        "      # The input PCollection is the word_counts created from the previous step.\n", 
        "      word_counts\n", 
        "\n", 
        "      # Format the results into a string so we can write them to a file.\n", 
        "      | 'Format results' >> beam.Map(lambda word_count: str(word_count))\n", 
        "      # Element type: str - text line\n", 
        "\n", 
        "      # Finally, write the results to a file.\n", 
        "      | 'Write results' >> beam.io.WriteToText(outputs_prefix)\n", 
        "  )\n", 
        "\n", 
        "# Sample the first 20 results, remember there are no ordering guarantees.\n", 
        "run('head -n 20 {}-00000-of-*'.format(outputs_prefix))"
      ], 
      "execution_count": 3, 
      "outputs": [
        {
          "output_type": "stream", 
          "text": [
            "WARNING:root:Deleting 1 existing files in target path matching: -*-of-%(num_shards)05d\n"
          ], 
          "name": "stderr"
        }, 
        {
          "output_type": "stream", 
          "text": [
            ">> head -n 20 outputs/part-00000-of-*\n", 
            "==> outputs/part-00000-of-00001 <==\n", 
            "(u'canker', 1)\n", 
            "(u'bounty', 2)\n", 
            "(u'provision', 3)\n", 
            "(u'to', 438)\n", 
            "(u'terms', 2)\n", 
            "(u'unnecessary', 2)\n", 
            "(u'tongue', 5)\n", 
            "(u'knives', 1)\n", 
            "(u'Commend', 1)\n", 
            "(u'Hum', 2)\n", 
            "(u'Set', 2)\n", 
            "(u'smell', 6)\n", 
            "(u'dreadful', 3)\n", 
            "(u'frowning', 1)\n", 
            "(u'World', 1)\n", 
            "(u'tike', 1)\n", 
            "(u'yes', 3)\n", 
            "(u'oldness', 1)\n", 
            "(u'boat', 1)\n", 
            "(u\"in's\", 1)\n", 
            "\n", 
            "==> outputs/part-00000-of-00003 <==\n", 
            "wrath: 3\n", 
            "nicely: 2\n", 
            "hall: 1\n", 
            "Sure: 2\n", 
            "legs: 4\n", 
            "ten: 1\n", 
            "yourselves: 1\n", 
            "embossed: 1\n", 
            "poorly: 1\n", 
            "temper: 2\n", 
            "Dismissing: 1\n", 
            "Legitimate: 1\n", 
            "tyrannous: 1\n", 
            "turn: 13\n", 
            "gold: 2\n", 
            "minds: 1\n", 
            "dowers: 2\n", 
            "policy: 1\n", 
            "I: 708\n", 
            "V: 6\n", 
            "\n", 
            "==> outputs/part-00000-of-00004 <==\n", 
            "retinue: 1\n", 
            "stink: 1\n", 
            "beaks: 1\n", 
            "Ten: 1\n", 
            "riots: 2\n", 
            "Service: 1\n", 
            "dealing: 1\n", 
            "stop: 2\n", 
            "detain: 1\n", 
            "beware: 1\n", 
            "pilferings: 1\n", 
            "swimming: 1\n", 
            "The: 124\n", 
            "Been: 1\n", 
            "behavior: 1\n", 
            "impetuous: 1\n", 
            "Thy: 20\n", 
            "Tis: 24\n", 
            "Soldiers: 7\n", 
            "Juno: 1\n", 
            "\n"
          ], 
          "name": "stdout"
        }
      ]
    }
  ]
}